package com.dahuan.tables;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

public class Table_KafkaPipeline {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        //数据来源在kafka
        tableEnv.connect( new Kafka()
                .version( "0.11" )
                .topic( "input" )
                .property( "zookeeper.conner", "Bigdata:2181" )
                .property( "bootstrap.servers", "Bigdata:9092" ) )  //kafka输入时的数据
                .withFormat( new Csv() )
                .withSchema( new Schema() //字段
                        .field( "id", DataTypes.STRING() )
                        .field( "timestamp", DataTypes.BIGINT() )
                        .field( "temperature", DataTypes.DOUBLE() )
                ).createTemporaryTable( "inputTable" );

        Table inputTable = tableEnv.from( "inputTable" );

        //简单转换
        Table resultTable = inputTable.select( "id,temperature" )
                .filter("id === 'sensor_1'");

        //聚合操作
        Table aggTable = inputTable.groupBy( "id" )
                .select( "id,id.count as cnt" );


        //TODO 创建kafka输入表
        tableEnv.connect(new Kafka()
                .version("0.11")
                .topic("output")
                .property("bootstrap.servers", "Bigdata:9092")
                .property("zookeeper.connect", "Bigdata:2181")
        ).withFormat(new Csv())
                .withSchema(new Schema() //字段
                        .field("id", DataTypes.STRING())
                        .field("temperature", DataTypes.DOUBLE())
                ).createTemporaryTable("outputTable");


        //将kafka输入的数据转到Kafka输出的Sink中
        resultTable.insertInto("outputTable");


    }
}
